From 2874b68e64e8c6b0e20933c922b349f2d29ba194 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 28 Jul 2014 21:13:30 -0700 Subject: [PATCH] Make the dependency queue entirely generic This paves the way for future intra-package parallelism as opposed to just inter-package parallelism. --- src/cargo/ops/cargo_rustc/job_queue.rs | 32 +++++----- src/cargo/util/dependency_queue.rs | 84 ++++++++++++-------------- src/cargo/util/mod.rs | 1 + 3 files changed, 59 insertions(+), 58 deletions(-) diff --git a/src/cargo/ops/cargo_rustc/job_queue.rs b/src/cargo/ops/cargo_rustc/job_queue.rs index 4eb5a516e..1c09b4dcb 100644 --- a/src/cargo/ops/cargo_rustc/job_queue.rs +++ b/src/cargo/ops/cargo_rustc/job_queue.rs @@ -4,20 +4,20 @@ use term::color::YELLOW; use core::{Package, PackageId, Resolve}; use util::{Config, TaskPool, DependencyQueue, Fresh, Dirty, Freshness}; -use util::{CargoResult, profile}; +use util::{CargoResult, Dependency, profile}; use super::job::Job; pub struct JobQueue<'a, 'b> { pool: TaskPool, - queue: DependencyQueue<'a, (&'a Package, (Job, Job))>, + queue: DependencyQueue<&'a PackageId, (&'a Package, (Job, Job))>, tx: Sender, rx: Receiver, active: HashMap<&'a PackageId, uint>, config: &'b mut Config<'b>, } -type Message = (PackageId, Freshness, CargoResult>); +type Message = (PackageId, CargoResult>); impl<'a, 'b> JobQueue<'a, 'b> { pub fn new(config: &'b mut Config<'b>, @@ -27,12 +27,10 @@ impl<'a, 'b> JobQueue<'a, 'b> { let (tx, rx) = channel(); let mut queue = DependencyQueue::new(); for &(pkg, _, _) in jobs.iter() { - queue.register(pkg); + queue.register(pkg.get_package_id()); } for (pkg, fresh, job) in jobs.move_iter() { - let mut deps = resolve.deps(pkg.get_package_id()) - .move_iter().flat_map(|a| a); - queue.enqueue(pkg, deps.collect(), fresh, (pkg, job)); + queue.enqueue(&resolve, fresh, pkg.get_package_id(), (pkg, job)); } JobQueue { @@ -59,18 +57,18 @@ impl<'a, 'b> JobQueue<'a, 'b> { while self.queue.len() > 0 { loop { match self.queue.dequeue() { - Some((id, Fresh, (pkg, (_, fresh)))) => { + Some((Fresh, id, (pkg, (_, fresh)))) => { assert!(self.active.insert(id, 1u)); try!(self.config.shell().status("Fresh", pkg)); - self.tx.send((id.clone(), Fresh, Ok(Vec::new()))); + self.tx.send((id.clone(), Ok(Vec::new()))); try!(fresh.run()); } - Some((id, Dirty, (pkg, (dirty, _)))) => { + Some((Dirty, id, (pkg, (dirty, _)))) => { assert!(self.active.insert(id, 1)); try!(self.config.shell().status("Compiling", pkg)); let my_tx = self.tx.clone(); let id = id.clone(); - self.pool.execute(proc() my_tx.send((id, Dirty, dirty.run()))); + self.pool.execute(proc() my_tx.send((id, dirty.run()))); } None => break, } @@ -79,7 +77,7 @@ impl<'a, 'b> JobQueue<'a, 'b> { // Now that all possible work has been scheduled, wait for a piece // of work to finish. If any package fails to build then we stop // scheduling work as quickly as possibly. - let (id, fresh, result) = self.rx.recv(); + let (id, result) = self.rx.recv(); let id = self.active.iter().map(|(&k, _)| k).find(|&k| k == &id) .unwrap(); *self.active.get_mut(&id) -= 1; @@ -90,12 +88,12 @@ impl<'a, 'b> JobQueue<'a, 'b> { let my_tx = self.tx.clone(); let my_id = id.clone(); self.pool.execute(proc() { - my_tx.send((my_id, fresh, job.run())); + my_tx.send((my_id, job.run())); }); } if *self.active.get(&id) == 0 { self.active.remove(&id); - self.queue.finish(id, fresh); + self.queue.finish(&id); } } Err(e) => { @@ -119,3 +117,9 @@ impl<'a, 'b> JobQueue<'a, 'b> { Ok(()) } } + +impl<'a> Dependency<&'a PackageId, &'a Resolve> for &'a PackageId { + fn dependencies(&self, resolve: &&'a Resolve) -> Vec<&'a PackageId> { + resolve.deps(*self).move_iter().flat_map(|a| a).collect() + } +} diff --git a/src/cargo/util/dependency_queue.rs b/src/cargo/util/dependency_queue.rs index c65bbcd64..450cd7e61 100644 --- a/src/cargo/util/dependency_queue.rs +++ b/src/cargo/util/dependency_queue.rs @@ -5,32 +5,31 @@ //! it to figure out when a dependency should be built. use std::collections::{HashMap, HashSet}; +use std::hash::Hash; -use core::{Package, PackageId}; - -pub struct DependencyQueue<'a, T> { - /// A list of all known packages to build. +pub struct DependencyQueue { + /// A list of all known keys to build. /// /// The value of the hash map is list of dependencies which still need to be /// built before the package can be built. Note that the set is dynamically /// updated as more dependencies are built. - pkgs: HashMap<&'a PackageId, (HashSet<&'a PackageId>, T)>, + dep_map: HashMap, V)>, /// A reverse mapping of a package to all packages that depend on that /// package. /// /// This map is statically known and does not get updated throughout the /// lifecycle of the DependencyQueue. - reverse_dep_map: HashMap<&'a PackageId, HashSet<&'a PackageId>>, + reverse_dep_map: HashMap>, /// A set of dirty packages. /// /// Packages may become dirty over time if their dependencies are rebuilt. - dirty: HashSet<&'a PackageId>, + dirty: HashSet, /// The packages which are currently being built, waiting for a call to /// `finish`. - pending: HashSet<&'a PackageId>, + pending: HashMap, } /// Indication of the freshness of a package. @@ -43,88 +42,85 @@ pub enum Freshness { Dirty, } -impl<'a, T> DependencyQueue<'a, T> { +/// A trait for discovering the dependencies of a piece of data. +pub trait Dependency: Hash + Eq + Clone { + fn dependencies(&self, cx: &C) -> Vec; +} + +impl, V> DependencyQueue { /// Creates a new dependency queue with 0 packages. - pub fn new() -> DependencyQueue<'a, T> { + pub fn new() -> DependencyQueue { DependencyQueue { - pkgs: HashMap::new(), + dep_map: HashMap::new(), reverse_dep_map: HashMap::new(), dirty: HashSet::new(), - pending: HashSet::new(), + pending: HashMap::new(), } } /// Registers a package with this queue. /// /// Only registered packages will be returned from dequeue(). - pub fn register(&mut self, pkg: &'a Package) { - self.reverse_dep_map.insert(pkg.get_package_id(), HashSet::new()); + pub fn register(&mut self, step: K) { + self.reverse_dep_map.insert(step, HashSet::new()); } /// Adds a new package to this dependency queue. /// /// It is assumed that any dependencies of this package will eventually also /// be added to the dependency queue. - pub fn enqueue(&mut self, pkg: &'a Package, deps: Vec<&'a PackageId>, - fresh: Freshness, data: T) { + pub fn enqueue(&mut self, cx: &C, fresh: Freshness, key: K, value: V) { // ignore self-deps - if self.pkgs.contains_key(&pkg.get_package_id()) { return } + if self.dep_map.contains_key(&key) { return } if fresh == Dirty { - self.dirty.insert(pkg.get_package_id()); + self.dirty.insert(key.clone()); } let mut my_dependencies = HashSet::new(); - for &dep in deps.iter() { - if dep == pkg.get_package_id() { continue } + for dep in key.dependencies(cx).move_iter() { + if dep == key { continue } // skip deps which were filtered out as part of resolve if !self.reverse_dep_map.find(&dep).is_some() { continue } - assert!(my_dependencies.insert(dep)); + assert!(my_dependencies.insert(dep.clone())); let rev = self.reverse_dep_map.find_or_insert(dep, HashSet::new()); - assert!(rev.insert(pkg.get_package_id())); + assert!(rev.insert(key.clone())); } - assert!(self.pkgs.insert(pkg.get_package_id(), - (my_dependencies, data))); + assert!(self.dep_map.insert(key, (my_dependencies, value))); } /// Dequeues a package that is ready to be built. /// /// A package is ready to be built when it has 0 un-built dependencies. If /// `None` is returned then no packages are ready to be built. - pub fn dequeue(&mut self) -> Option<(&'a PackageId, Freshness, T)> { - let pkg = match self.pkgs.iter() - .find(|&(_, &(ref deps, _))| deps.len() == 0) - .map(|(name, _)| *name) { - Some(pkg) => pkg, + pub fn dequeue(&mut self) -> Option<(Freshness, K, V)> { + let key = match self.dep_map.iter() + .find(|&(_, &(ref deps, _))| deps.len() == 0) + .map(|(key, _)| key.clone()) { + Some(key) => key, None => return None }; - let (_, data) = self.pkgs.pop(&pkg).unwrap(); - self.pending.insert(pkg); - let fresh = if self.dirty.contains(&pkg) {Dirty} else {Fresh}; - Some((pkg, fresh, data)) + let (_, data) = self.dep_map.pop(&key).unwrap(); + let fresh = if self.dirty.contains(&key) {Dirty} else {Fresh}; + self.pending.insert(key.clone(), fresh); + Some((fresh, key, data)) } /// Returns the number of remaining packages to be built. pub fn len(&self) -> uint { - self.pkgs.len() + self.pending.len() + self.dep_map.len() + self.pending.len() } /// Indicate that a package has been built. /// /// This function will update the dependency queue with this information, /// possibly allowing the next invocation of `dequeue` to return a package. - /// - /// The `fresh` parameter is used to indicate whether the package was - /// actually rebuilt or not. If no action was taken, then the parameter - /// should be `Fresh`. If a package was rebuilt, `Dirty` should be - /// specified, and the dirtiness will be propagated properly to all - /// dependencies. - pub fn finish(&mut self, pkg: &'a PackageId, fresh: Freshness) { - assert!(self.pending.remove(&pkg)); - let reverse_deps = match self.reverse_dep_map.find(&pkg) { + pub fn finish(&mut self, key: &K) { + let fresh = self.pending.pop(key).unwrap(); + let reverse_deps = match self.reverse_dep_map.find(key) { Some(deps) => deps, None => return, }; @@ -132,7 +128,7 @@ impl<'a, T> DependencyQueue<'a, T> { if fresh == Dirty { self.dirty.insert(dep.clone()); } - assert!(self.pkgs.get_mut(dep).mut0().remove(&pkg)); + assert!(self.dep_map.get_mut(dep).mut0().remove(key)); } } } diff --git a/src/cargo/util/mod.rs b/src/cargo/util/mod.rs index f1da499f4..6b1aa72b2 100644 --- a/src/cargo/util/mod.rs +++ b/src/cargo/util/mod.rs @@ -8,6 +8,7 @@ pub use self::paths::realpath; pub use self::hex::{to_hex, short_hash}; pub use self::pool::TaskPool; pub use self::dependency_queue::{DependencyQueue, Fresh, Dirty, Freshness}; +pub use self::dependency_queue::Dependency; pub use self::graph::Graph; pub use self::to_url::ToUrl; -- 2.30.2